#!/usr/bin/env bash
# db-unload -- Unloads a SQL query to a set of sinks
# > db-unload QUERY FORMAT SINK...
#
# When DEEPDIVE_UNLOAD_MATERIALIZED environment is set to `false`, skips
# materializing JOIN queries before unloading in parallel.  On the other hand,
# when it is set to `true`, parallel unloads will unconditionally materialize
# given QUERY before unloading.  Defaults to materializing only JOIN queries.
##
set -eu

: ${DEEPDIVE_UNLOAD_MATERIALIZED:=}

[[ $# -gt 0 ]] || usage "$0" "Missing QUERY"
query=$1; shift
[[ $# -gt 0 ]] || usage "$0" "Missing FORMAT"
format=$1; shift
[[ $# -gt 0 ]] || usage "$0" "Missing SINK"

STEP() { echo >&2 "$@"; }

case $format in
    tsj|tsv|csv)
        nsinks=$#
        if [[ $nsinks -gt 1 ]]; then
            STEP "unloading with $nsinks parallel connections: $(escape4sh "$query")"
            i=1
            queryHasJOINs=false
            tmpTable="($query) R"
            tmpTableWasMaterialized=false
            # materialize the query first if it contains joins
            case "$(tr a-z A-Z <<<"$query")" in *"FROM"*","*|*"JOIN"*) queryHasJOINs=true;; esac
            if ${DEEPDIVE_UNLOAD_MATERIALIZED:-false} ||
                    [[ -z $DEEPDIVE_UNLOAD_MATERIALIZED ]] && $queryHasJOINs; then
                tmpTable="dd_unload_$(sha1sum <<<"$query" | cut -b1-40)"
                STEP "materializing query result before parallel unload (export DEEPDIVE_UNLOAD_MATERIALIZED=false to skip)"
                db-create-table-as "$tmpTable" "$query"
                tmpTableWasMaterialized=true
            elif $queryHasJOINs; then
                STEP "skipping materializing JOIN query before parallel unload as DEEPDIVE_UNLOAD_MATERIALIZED isn't true"
            fi
            STEP "partitioning query result into $nsinks for parallel unload"
            while read limit offset end; do
                sink=$1; shift
                show_progress output_from "unloading [$i/$nsinks]" -- \
                db-query "SELECT * FROM $tmpTable LIMIT $limit OFFSET $offset" "$format" 0 >"$sink" &
                let ++i
            done < <(partition_integers $(db-query "SELECT COUNT(*) FROM $tmpTable" "$format" 0) $nsinks)
            wait
            ! $tmpTableWasMaterialized ||
                db-execute "DROP TABLE IF EXISTS $tmpTable CASCADE" 2>/dev/null || true
        else
            sink=$1; shift
            STEP "unloading to $sink: $(escape4sh "$query")"
            exec show_progress output_from "unloading" -- \
                db-query "$query" "$format" 0 >"$sink"
        fi
        ;;
    *) error "$format: unsupported format by PostgreSQL driver" ;;
esac
